package conn

import (
	"gitee.com/ander888/tools/zlog"
	"github.com/streadway/amqp"
)

// MQURL amqp://user:password@host:port/vhost
// amqp://是固定参数，这个信息是固定不变的。后面两个是用户名密码ip地址端口号Virtual Host
// 如果vhost是“/”就输入/%2F，/%2F代表斜杠

// RabbitMQ rabbitMQ结构体
type RabbitMQ struct {
	conn      *amqp.Connection
	channel   *amqp.Channel
	QueueName string //队列名称
	Exchange  string //交换机名称
	Key       string //bind Key 名称
	MQUrl     string //连接信息
}

// NewRabbitMQ 创建结构体实例
func NewRabbitMQ(mqUrl, queueName, exchange, key string) *RabbitMQ {
	return &RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, MQUrl: mqUrl}
}

// Destroy 断开channel 和 connection
func (r *RabbitMQ) Destroy() {
	err := r.channel.Close()
	if err != nil {
		zlog.Err(err, "rabbitMQ 销毁channel和链接会话失败")
		return
	}
	err = r.conn.Close()
	if err != nil {
		zlog.Err(err, "rabbitMQ 链接会话关闭失败")
	}
}

// 错误处理函数
func (r *RabbitMQ) failOnErr(err error, msg string) {
	if err != nil {
		zlog.ErrWithStr(err).Msg(msg)
	}
}

/* ---------------------------1.简单模式-----------------------------------------
特点：
生产者直接将消息发送到队列中，消费者从队列中获取消息并进行处理。
消息一旦被消费，就会从队列中移除。

真实应用场景：
电子邮件通知系统：生产者将待发送的邮件放入队列，消费者负责从队列中获取邮件并发送。

优点：
实现简单，易于理解和部署。
适用于单一消费者场景。

缺点：
没有消息持久化，一旦 RabbitMQ 服务器重启，队列中的消息将会丢失。
————————————————————————————————————————————————————————————————————————————————*/
// 创建简单模式下RabbitMQ实例 mqUrl = "amqp://user:pwd@ip:port/%2F"
func NewRabbitMQSimple(mqUrl, queueName string) *RabbitMQ {
	//创建RabbitMQ实例
	rbtMQ := NewRabbitMQ(mqUrl, queueName, "", "")
	var err error
	//获取connection
	rbtMQ.conn, err = amqp.Dial(rbtMQ.MQUrl)
	rbtMQ.failOnErr(err, "无法连接到rabbitMQ")
	//获取channel
	rbtMQ.channel, err = rbtMQ.conn.Channel()
	rbtMQ.failOnErr(err, "无法打开rabbitMQ的channel")
	return rbtMQ
}

// simple模式队列生产
func (r *RabbitMQ) PublishSimple(msg string) {
	errMsg := "发送消息:'" + msg + "'失败"
	//1.申请队列，如果队列不存在会自动创建，存在则跳过创建
	_, err := r.channel.QueueDeclare(
		r.QueueName, // queue:队列名称
		true,        // durable:是否持久化
		false,       // exclusive:是否独占即只能有一个消费者监听这个队列
		false,       // autoDelete:是否自动删除。当没有Consumer时，自动删除掉
		false,       // noWait:是否阻塞处理。true:不阻塞，false:阻塞
		nil,         // arguments:其他属性
	)
	if err != nil {
		r.failOnErr(err, errMsg)
		return
	}
	//调用channel 发送消息到队列中
	r.channel.Publish(
		r.Exchange,
		r.QueueName,
		false, //如果为true，根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
		false, //如果为true，当exchange发送消息到队列后发现队列上没有消费者，则会把消息返还给发送者
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(msg),
		})
}

// simple模式下消费者
func (r *RabbitMQ) ConsumeSimple() (msgs <-chan amqp.Delivery) {
	//1.申请队列，如果队列不存在会自动创建，存在则跳过创建
	q, err := r.channel.QueueDeclare(
		r.QueueName, //队列名称
		true,        //是否持久化
		false,       //是否自动删除
		false,       //是否具有排他性
		false,       //是否阻塞处理
		nil,         //额外的属性
	)
	if err != nil {
		r.failOnErr(err, "创建读取消息队列失败")
		return
	}
	//接收消息
	msgs, err = r.channel.Consume(q.Name, "", true, false, false, false, nil)
	if err != nil {
		r.failOnErr(err, "接收消息失败")
	}
	return
}

/* ---------------------------2.工作模式-----------------------------------------
特点：
多个消费者同时从同一个队列中获取消息并处理，每个消息只能被一个消费者处理。
消息在被消费者处理后会从队列中移除。

真实应用场景：
任务分发系统：多个消费者同时从队列中获取任务，每个消费者处理一个任务，以实现任务的负载均衡。

优点：
可以通过增加消费者来提高消息处理的并发性。

缺点：
没有消息持久化，一旦 RabbitMQ 服务器重启，队列中的消息将会丢失。
————————————————————————————————————————————————————————————————————————————————*/
// NewRabbitMQWork 创建work模式下RabbitMQ实例 mqUrl = "amqp://user:pwd@ip:port/%2F"
func NewRabbitMQWork(mqUrl, queueName string) *RabbitMQ {
	//创建RabbitMQ实例
	rbtMQ := NewRabbitMQ(mqUrl, queueName, "", "")
	var err error
	//获取connection
	rbtMQ.conn, err = amqp.Dial(rbtMQ.MQUrl)
	rbtMQ.failOnErr(err, "failed to connect rabb"+
		"itmq!")
	//获取channel
	rbtMQ.channel, err = rbtMQ.conn.Channel()
	rbtMQ.failOnErr(err, "无法打开rabbitMQ的channel")
	return rbtMQ
}

// PublishWork work模式队列生产
func (r *RabbitMQ) PublishWork(msg string) {
	errMsg := "发送消息:'" + msg + "'失败"
	//1.申请队列，如果队列不存在会自动创建，存在则跳过创建
	_, err := r.channel.QueueDeclare(r.QueueName, true, false, false, false, nil)
	if err != nil {
		r.failOnErr(err, errMsg)
		return
	}
	//2.调用channel 发送消息到队列中
	r.channel.Publish(r.Exchange, r.QueueName, false, false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(msg),
		})
}

// ConsumeWork work 模式下消费者
func (r *RabbitMQ) ConsumeWork() (msgs <-chan amqp.Delivery) {
	//1.申请队列，如果队列不存在会自动创建，存在则跳过创建
	q, err := r.channel.QueueDeclare(r.QueueName, true, false, false, false, nil)
	if err != nil {
		r.failOnErr(err, "无法创建读取消息队列")
		return
	}
	//接收消息
	msgs, err = r.channel.Consume(q.Name, "", true, false, false, false, nil)
	if err != nil {
		r.failOnErr(err, "无法消费消息")
	}
	return
}

/* ---------------------------3.订阅模式-----------------------------------------
特点：
生产者将消息发送到一个交换机中，交换机将消息广播给所有绑定了该交换机的队列。
每个消息都会被交换机广播给所有绑定了该交换机的队列。

真实应用场景：
新闻推送系统：生产者将最新的新闻发布到交换机中，每个订阅者（消费者）都会收到相同的新闻。

优点：
实现了消息的广播功能，适用于多个消费者同时接收相同消息的场景。

缺点：
无法根据消费者的兴趣或条件选择性地接收消息。
————————————————————————————————————————————————————————————————————————————————*/
// NewRabbitMQPub 创建Pub模式下RabbitMQ实例 mqUrl = "amqp://user:pwd@ip:port/%2F"
func NewRabbitMQPub(mqUrl, exchangeName string) *RabbitMQ {
	//创建RabbitMQ实例
	rbtMQ := NewRabbitMQ(mqUrl, "", exchangeName, "")
	var err error
	//获取connection
	rbtMQ.conn, err = amqp.Dial(rbtMQ.MQUrl)
	rbtMQ.failOnErr(err, "failed to connect rbtMQ!")
	//获取channel
	rbtMQ.channel, err = rbtMQ.conn.Channel()
	rbtMQ.failOnErr(err, "无法打开rabbitMQ的channel")
	return rbtMQ
}

// PublishPub Pub模式队列生产 交换机类型：fanout
func (r *RabbitMQ) PublishPub(msg string) {
	errMsg := "发送消息:'" + msg + "'失败"
	//1.尝试创建交换机
	err := r.channel.ExchangeDeclare(
		r.Exchange, //交换机名称
		"fanout",   //交换机类型
		true,       //是否持久化
		false,      //是否自动删除
		false,      //true表示这个exchange不可以被client用来推送消息，仅用来进行exchange和exchange之间的绑定
		false,      //是否阻塞处理
		nil,        //其他属性
	)
	if err != nil {
		r.failOnErr(err, errMsg)
		return
	}

	//2.发送消息
	r.channel.Publish(
		r.Exchange, "",
		false, //如果为true，根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
		false, //如果为true，当exchange发送消息到队列后发现队列上没有消费者，则会把消息返还给发送者
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(msg),
		})
}

// ConsumePub pub 模式下消费者
func (r *RabbitMQ) ConsumePub() (msgs <-chan amqp.Delivery) {
	//1.尝试创建交换机
	err := r.channel.ExchangeDeclare(
		r.Exchange, //交换机名称
		"fanout",   //交换机类型
		true,       //是否持久化
		false,      //是否自动删除
		false,      //true表示这个exchange不可以被client用来推送消息，仅用来进行exchange和exchange之间的绑定
		false,      //是否阻塞处理
		nil,        //其他属性
	)
	if err != nil {
		r.failOnErr(err, "Failed to declare an exchange")
		return
	}

	//2.试探性创建队列，这里注意队列名称不要写
	q, err := r.channel.QueueDeclare(
		"",    //随机生产队列名称
		false, //是否持久化
		false, //是否自动删除
		true,  //是否具有排他性
		false, //是否阻塞处理
		nil,   //其他参数
	)
	if err != nil {
		r.failOnErr(err, "Failed to declare a queue")
		return
	}

	//绑定队列到 exchange 中
	err = r.channel.QueueBind(
		q.Name,
		//在pub/sub模式下，这里的key要为空
		"", r.Exchange, false, nil)
	if err != nil {
		r.failOnErr(err, "绑定队列到 exchange 中失败")
		return
	}
	//接收消息
	msgs, err = r.channel.Consume(q.Name, "", true, false, false, false, nil)
	if err != nil {
		r.failOnErr(err, "无法消费消息")
	}
	return
}

/* ---------------------------4.路由模式-----------------------------------------
路由模式（Routing Mode）是消息队列中一种常见的消息路由模式，它允许消息的发送者（Producer）将消息发送到指定的交换机（Exchange），而交换机则根据消息的路由键（Routing Key）将消息路由到符合条件的队列（Queue）。路由模式提供了一种灵活的消息路由机制，使得消息的分发可以根据不同的路由规则进行定制。

优点：
灵活性高： 路由模式允许根据消息的属性或条件将消息路由到不同的队列或消费者，从而实现灵活的消息处理和路由策略。
解耦性强： 路由模式可以将消息的生产者和消费者解耦，使它们之间相互独立。生产者只需将消息发送到交换机，而不需要知道具体的消费者是谁，消费者也只需订阅感兴趣的消息，而不需要知道消息的生产者是谁，从而实现了系统的解耦。
灵活的消息过滤和路由： 路由模式允许根据消息的属性、标签或主题将消息路由到不同的队列或消费者，实现精确的消息过滤和路由，从而提高了系统的灵活性和可扩展性。
支持多种路由策略： 路由模式支持多种路由策略，如直连路由、主题路由、分发路由等，可以根据不同的业务需求选择合适的路由策略，从而满足不同的应用场景。

缺点：
配置复杂： 路由模式的配置相对复杂，需要定义交换机、队列和绑定关系，以及配置路由规则等，如果配置不当可能会导致消息路由错误或丢失。
性能损耗： 路由模式需要对消息进行额外的路由和过滤操作，可能会增加系统的消息处理延迟和性能消耗，特别是在消息量较大的情况下。
消息堆积风险： 如果路由模式配置不当或消息处理能力不足，可能会导致消息堆积的风险，从而影响系统的稳定性和可靠性。
维护成本高： 路由模式需要维护交换机、队列和绑定关系等配置信息，特别是在系统规模较大或消息路由策略较复杂的情况下，维护成本可能会较高。

使用场景：
微服务架构下的消息路由：在微服务架构中，不同的服务可能需要处理不同类型的消息。通过路由模式，可以将消息根据服务的需求进行路由，确保每个服务只接收到与其相关的消息，从而实现解耦和灵活性。
实时监控与日志分析：在监控系统或日志分析系统中，需要根据不同的指标或日志级别对数据进行分类和分析。路由模式可以根据数据的属性或标签将数据路由到不同的处理节点，以实现实时监控和日志分析。
任务调度与负载均衡：在任务调度系统中，任务可能具有不同的优先级或类型，需要根据任务的属性进行分发和调度。通过路由模式，可以将任务路由到不同的工作节点，实现任务的负载均衡和优先级调度。
多级消息过滤与订阅：在订阅发布系统中，订阅者可能对不同类型或主题的消息感兴趣。路由模式可以根据消息的属性或主题将消息路由到不同的订阅者，实现精确的消息过滤和订阅。
数据同步与复制：在分布式系统中，可能需要将数据复制到不同的节点或数据中心，以实现数据的备份和灾备。通过路由模式，可以将数据根据不同的条件进行路由和复制，确保数据的一致性和可靠性
————————————————————————————————————————————————————————————————————————————————*/
// NewRabbitMQRouting 路由模式 创建RabbitMQ实例 mqUrl = "amqp://user:pwd@ip:port/%2F"
func NewRabbitMQRouting(mqUrl, exchangeName, routingKey string) *RabbitMQ {
	//创建RabbitMQ实例
	rbtMQ := NewRabbitMQ(mqUrl, "", exchangeName, routingKey)
	var err error
	//获取connection
	rbtMQ.conn, err = amqp.Dial(rbtMQ.MQUrl)
	rbtMQ.failOnErr(err, "failed to connect rbtMQ!")
	//获取channel
	rbtMQ.channel, err = rbtMQ.conn.Channel()
	rbtMQ.failOnErr(err, "无法打开rabbitMQ的channel")
	return rbtMQ
}

// PublishRouting 路由模式发送消息 交换机类型：direct
func (r *RabbitMQ) PublishRouting(msg string) {
	errMsg := "发送消息:'" + msg + "'失败"
	//1.尝试创建交换机
	err := r.channel.ExchangeDeclare(r.Exchange, "direct", true,
		false, false, false, nil)
	if err != nil {
		r.failOnErr(err, errMsg)
		return
	}
	//2.发送消息
	r.channel.Publish(r.Exchange, r.Key, false, false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(msg),
		})
}

// ConsumeRouting routing 模式下消费者 交换机类型：direct
func (r *RabbitMQ) ConsumeRouting() (msgs <-chan amqp.Delivery) {
	//1.试探性创建交换机
	err := r.channel.ExchangeDeclare(r.Exchange, "direct", true, false, false, false, nil)
	if err != nil {
		r.failOnErr(err, "Failed to declare an exchange")
		return
	}

	//2.试探性创建队列，这里注意队列名称不要写
	q, err := r.channel.QueueDeclare("", false, false, true, false, nil)
	if err != nil {
		r.failOnErr(err, "Failed to declare a queue")
		return
	}

	//3.绑定队列到 exchange 中
	err = r.channel.QueueBind(q.Name, r.Key, r.Exchange, false, nil)
	if err != nil {
		r.failOnErr(err, "绑定队列到 exchange 中失败")
		return
	}

	//4.接收消息
	msgs, err = r.channel.Consume(q.Name, "", true, false, false, false, nil)
	if err != nil {
		r.failOnErr(err, "无法消费消息")
	}
	return
}

/* ---------------------------5.Topic模式-----------------------------------------
话题模式（Topic Mode）是消息队列中一种高级的消息路由模式，它基于消息的主题（Topic）进行消息的订阅和分发。在话题模式中，消息的发送者（Producer）将消息发送到特定的主题，而消息的接收者（Consumer）则根据自己感兴趣的主题进行订阅，从而接收相关的消息。

工作原理：
主题定义： 在话题模式中，主题由一个或多个单词组成，每个单词之间用点号（.）分隔，例如：stock.usd.nyse。
通配符匹配： 话题模式支持两种通配符，分别是*（星号）和#（井号）。
*：表示匹配一个单词，例如：stock.*.nyse可以匹配stock.usd.nyse和stock.eur.nyse等。
#：表示匹配零个或多个单词，例如：stock.usd.#可以匹配stock.usd.nyse、stock.usd.nasdaq以及stock.usd等。
消息路由： 消息的发送者在发送消息时指定一个主题，而消息的接收者则可以使用通配符来订阅感兴趣的主题。消息队列根据主题的匹配规则将消息路由到符合条件的订阅者。

应用场景：
多级消息过滤： 话题模式可以根据消息的主题进行多级的消息过滤和匹配，从而实现精确的消息路由和分发。
发布/订阅系统： 话题模式常用于发布/订阅系统中，其中消息的发送者作为发布者，向不同的主题发送消息，而消息的接收者作为订阅者，根据自己的需求订阅感兴趣的主题。
事件驱动架构： 话题模式可以用于构建事件驱动的架构，其中各个服务之间通过消息队列进行事件的发布和订阅，从而实现服务之间的解耦和灵活的消息传递。

优点：
灵活性高： 话题模式支持灵活的主题定义和通配符匹配，可以根据不同的业务需求实现精确的消息路由和分发。
解耦性强： 话题模式可以将消息的生产者和消费者解耦，使它们之间相互独立，从而提高系统的可维护性和可扩展性。

缺点：
配置复杂： 话题模式的配置相对复杂，需要定义主题和通配符规则，并确保发送者和接收者之间的匹配规则一致，否则可能导致消息路由错误或丢失。
性能损耗： 话题模式需要对消息进行额外的匹配和路由操作，可能会增加系统的消息处理延迟和性能消耗，特别是在消息量较大的情况下
————————————————————————————————————————————————————————————————————————————————*/
// NewRabbitMQTopic topic模式 创建RabbitMQ实例 mqUrl = "amqp://user:pwd@ip:port/%2F"
func NewRabbitMQTopic(mqUrl, exchangeName, routingKey string) *RabbitMQ {
	//创建RabbitMQ实例
	rbtMQ := NewRabbitMQ(mqUrl, "", exchangeName, routingKey)
	var err error
	//获取connection
	rbtMQ.conn, err = amqp.Dial(rbtMQ.MQUrl)
	rbtMQ.failOnErr(err, "failed to connect rbtMQ!")
	//获取channel
	rbtMQ.channel, err = rbtMQ.conn.Channel()
	rbtMQ.failOnErr(err, "无法打开rabbitMQ的channel")
	return rbtMQ
}

// PublishTopic 话题模式发送消息
func (r *RabbitMQ) PublishTopic(msg string) {
	errMsg := "发送消息:'" + msg + "'失败"
	//1.尝试创建交换机
	err := r.channel.ExchangeDeclare(r.Exchange, "topic", true,
		false, false, false, nil)
	if err != nil {
		r.failOnErr(err, errMsg)
		return
	}

	//2.发送消息
	r.channel.Publish(r.Exchange, r.Key, false, false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(msg),
		})
}

// ConsumeTopic topic 模式下消费者,交换机类型:topic
func (r *RabbitMQ) ConsumeTopic() (msgs <-chan amqp.Delivery) {
	//1.试探性创建交换机
	err := r.channel.ExchangeDeclare(r.Exchange, "topic", true, false, false, false, nil)
	if err != nil {
		r.failOnErr(err, "Failed to declare an exchange")
		return
	}

	//2.试探性创建队列，这里注意队列名称不要写
	q, err := r.channel.QueueDeclare("", false, false, true, false, nil)
	if err != nil {
		r.failOnErr(err, "Failed to declare a queue")
		return
	}

	//3.绑定队列到 exchange 中
	err = r.channel.QueueBind(q.Name, r.Key, r.Exchange, false, nil)
	if err != nil {
		r.failOnErr(err, "绑定队列到 exchange 中失败")
		return
	}

	//4.接收消息
	msgs, err = r.channel.Consume(q.Name, "", true, false, false, false, nil)
	if err != nil {
		r.failOnErr(err, "无法消费消息")
	}
	return
}
